package com.it.process;

import com.it.operator.utils.SourceUtils;
import com.it.pojo.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * watermark的触发会滞后于数据.
 *
 * @author code1997
 */
public class ProcessFunctionTest {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator<Event> eventSource = SourceUtils.getEventSource(executionEnvironment)
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.timestamp;
                    }
                }));
        eventSource.process(new ProcessFunction<Event, String>() {
            @Override
            public void processElement(Event value, ProcessFunction<Event, String>.Context ctx, Collector<String> out) throws Exception {
                if ("tom".equalsIgnoreCase(value.user)) {
                    out.collect(value.user + " clicks " + value.url);
                }

                System.out.println("timestamp::" +ctx.timestamp());
                Thread.sleep(100);
                System.out.println("currentWatermark::"+ctx.timerService().currentWatermark());
                out.collect(value.toString());
            }
        }).print();
        executionEnvironment.execute();

    }
}
